#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
//#include <pthread.h>
#include <assert.h>
//#include <stdbool.h>
#include <netinet/in.h>
#include <sys/syscall.h>
#include "binary_protocol.h"
#include "general.h"
#include "mc.h"
#include "util.h"

#include <event2/event.h>
#include <event2/bufferevent.h>
#include <binary_protocol.h>
#include <pthread.h>
//#include <event2/listener.h>
//#include <event2/thread.h>

#define ENABLE_DBG (1)

/* threads */
pthread_mutex_t stats_mutex;
pthread_mutex_t fd_mutex;
int num_threads;
int warmup_time; //sec
//double execuation_time; //sec
time_t p_start, p_end;

/* sockets */
//int tcp_fd;
//int u_sock_fd;
int *sfds;
struct hostent *u_server;
struct sockaddr_in u_server_addr;
extern int **server_fds;

/* env variables */
char *key_prefix;
int num_connections;
int num_requests;
int num_keys;
int header_op;
int k_length;
int v_length;
//int poll_wait_sec;
int rec_interval;
int rec_round;
unsigned long seed;
char *config_file = NULL;
int  gen_fix_keys;
int  pure_get_mode;
int batch_size;

//struct event_base *main_base;

typedef struct cli_r_cb_arg_st {
	char *req_buf;
	char *resp_buf;
} cli_r_cb_arg;
cli_r_cb_arg *cb_arg_list;

typedef struct ev_timer_st {
	struct event *ev;
	struct timeval to;
	int timer_count;
	bool is_running;
} ev_timer;
ev_timer timer;

typedef struct libevent_thread_st {
	int sfd;
	pthread_t tid;
	int t_rank;
	struct event_base *base;
	//struct event *ev;
	struct bufferevent *bev;
	cli_r_cb_arg *cb_arg;
} ev_thread_t;

ev_thread_t *ev_threads;

typedef struct count_stats_st {
	long num_set_req;
	long num_set_resp_success;
	long num_set_resp_fail;
	long num_get_req;
	long num_get_resp_success;
	long num_get_resp_fail;
	uint8_t cmd_mode;

	long num_last_finished_get;

	pthread_mutex_t _lock;
} count_stats;
count_stats c_stats;

typedef struct k_v_pair_st {
	char key[MAX_KEY_LEN];
	char value[MAX_VAL_LEN];
	//size_t key_len;
	//size_t value_len;
} k_v_pair;

k_v_pair **k_v_pair_list;

const char key_text[] = "123456789";
const char value_text[] = "abcdefghijklmnopqrstuvwxyz";

void socket_ev_cb(struct bufferevent *bev, short events, void *arg);
void client_recv_cb(struct bufferevent *bev, void *arg);
void setup_timer(struct event_base *base, int sec);
int process_request(char *req_buf);

int gettid() {
	pid_t tid = syscall(SYS_gettid);
	return tid;
}

void display_stats(count_stats *stats) {
//	fprintf(stderr, "==== stats ====\n");
//	fprintf(stderr, "set(req=%ld suc=%ld fail=%ld) get(req=%ld suc=%ld fail=%ld)\n",
//			stats->num_set_req, stats->num_set_resp_success, stats->num_set_resp_fail,
//			stats->num_get_req, stats->num_get_resp_success, stats->num_set_resp_fail);
}

void init_default_env() {
	num_connections = DEFAULT_CONNECTIONS;
	num_requests = DEFAULT_REQUESTS;
	num_keys = DEFAULT_KEYS;
	//poll_wait_sec = DEFAULT_POLL_WAIT_SEC;
	seed = DEFAULT_SEED;
	header_op = DEFAULT_OP;
}

void usage(char *prog)
{
	printf("Usage hhh: %s [-c n] [-g n] [-i n] [-k n] [-r n] [-s s] [-t n] host port\n", prog);
	printf("\t-c n Open n connections (default: %d)\n",
		DEFAULT_CONNECTIONS);
	printf("\t-k n Number of keys in key space (default: %d)\n",
		DEFAULT_KEYS);
	printf("\t-r n Requests to send simultaneously (default: %d)\n",
		DEFAULT_REQUESTS);
	printf("\t-s s Seed to use for random number generation (default: %d)\n",
		DEFAULT_SEED);
	printf("\t-t n Time out requests after n seconds (default: %d)\n",
		DEFAULT_POLL_WAIT_SEC);
	exit(1);
}

/* Returns an integer argument from getopt */
int intarg(char *prog, char *arg)
{
	int x = atoi(arg);
	if (x == 0)
		usage(prog);
	return x;
}

void gen_random(const char *src, char *dst, int len, char *prefix) {
	uint32_t tmp_len = (size_t)len;
	uint32_t prefix_len = 0;
	if (prefix != NULL) {
		prefix_len = strlen(prefix);
		tmp_len -= prefix_len;
		strncpy(dst, prefix, strlen(prefix));
	}
	int i=0;
	for (i=0; i<tmp_len; i++) {
		dst[prefix_len+i] = src[rand()%(sizeof(src)-1)];
	}
}

void gen_mono_increase(char *dst, int len, char *prefix, long number) {
	uint32_t tmp_len = (uint32_t)len;
	//printf("------------------- tmp_len %d\n",tmp_len);
	uint32_t prefix_len = 0;
	char padding[1000], buf[1000];
	if (prefix != NULL) {
		prefix_len = strlen(prefix);
		tmp_len -= prefix_len;
		strncpy(dst, prefix, prefix_len);
	}
	int n = snprintf(NULL, 0, "%ld", number);
	//printf("------------------- n %d\n",n);
	assert(n > 0);
	uint32_t pad_len = tmp_len - n;
	memset(padding, '0', 1000);
	strncpy(dst+prefix_len, padding, pad_len);
	//printf("------------------- pad_len %d\n",pad_len);
	int c = snprintf(buf, n+1, "%ld", number); //use n+1 instead of n
	//printf("------------------- c %d\n",c);
	assert(buf[n] == '\0');
	assert(c == n);
	//printf("in dst1  ---------------------------%s,%d,%d,%d\n",dst,sizeof(&dst),prefix_len,pad_len);
	strncpy(dst+prefix_len+pad_len, buf, c+1);
	//printf("in dst  ---------------------------%s,%d,%d,%d\n",dst,strlen(dst),prefix_len,pad_len);
//	fprintf(stderr, "len prefix=%lu pad_len=%lu number=%ld =%d/%d\n", prefix_len, pad_len, number, n, c);
}

/*
void gen_mono_increase(char *dst, int len, char *prefix) {
	size_t tmp_len = (size_t)len;
	size_t prefix_len = 0;
	if (prefix != NULL) {
		prefix_len = strlen(prefix);
		tmp_len -= prefix_len;
		strncpy(dst, prefix, strlen(prefix));
	}
	const int n = snprintf(NULL, 0, "%lu", global_key_seq_no);
	assert(n > 0);
	int pad_len = len - prefix_len - n;
	char padding[pad_len], buf[n+1];
	memset(padding, '0', pad_len);
	strncpy(dst+prefix_len, padding, pad_len);
	int c = snprintf(buf, n+1, "%lu", global_key_seq_no);
	assert(buf[n] == '\0');
	assert(c == n);
	strncpy(dst+prefix_len+pad_len, buf, c);
}*/

void init_k_v_pair_list(int num_keys, char *prefix) {
	k_v_pair_list = (k_v_pair **)malloc(num_keys * sizeof(k_v_pair *));
	long global_key_seq_no = 1;
		int i=0;
	for ( i=0; i<num_keys; i++) {
		k_v_pair_list[i] = (k_v_pair *)malloc(sizeof(k_v_pair));
		/*
		//pre-fill
		memset(k_v_pair_list[i]->key, 'a', k_length);
		memset(k_v_pair_list[i]->value, 'b', v_length);

		size_t kl = snprintf(NULL, 0, "%s%d", prefix, i);
		sprintf(k_v_pair_list[i]->key, "%s%d", prefix, i);
		sprintf(k_v_pair_list[i]->key + kl, "testkey");

		size_t vl = snprintf(NULL, 0, "value%d", i);
		sprintf(k_v_pair_list[i]->value, "test%dvalue%d", i, i);
		 */
//printf("in key_length,v_length  ---------------------------%d,%d\n",k_length,v_length);
		if (gen_fix_keys) {
			gen_mono_increase(k_v_pair_list[i]->key, k_length, prefix, global_key_seq_no);
			gen_mono_increase(k_v_pair_list[i]->value, v_length, NULL, global_key_seq_no);
			global_key_seq_no++;
		} else {
			gen_random(key_text, k_v_pair_list[i]->key, k_length, prefix);
			gen_random(value_text, k_v_pair_list[i]->value, v_length, NULL);
		}
		if (ENABLE_DBG) {
//			fprintf(stderr, "init [%d] k=%s v=%s\n", i, k_v_pair_list[i]->key,
//					k_v_pair_list[i]->value);
		}
//		printf("-----------------------------------key,value:%s,%s\n",k_v_pair_list[i]->key,k_v_pair_list[i]->value);

	}
//	fprintf(stderr, "finish init k_v pair list k_length=%d v_length=%d pairs=%d\n",
//			k_length, v_length, num_keys);
}

void release_k_v_pair_list(int num_keys) {
	if (NULL == k_v_pair_list)
		exit(EXIT_FAILURE);
			int i=0;
	for ( i=0; i<num_keys; i++) {
		if (NULL != k_v_pair_list[i])
			free(k_v_pair_list[i]);
	}
	free(k_v_pair_list);
}

char *get_value_by_key(char *key, int k_len) {
		int i=0;
	for (i=0; i<num_keys; i++) {
		if (0 == strncmp(key, k_v_pair_list[i]->key, k_len)) {
			return k_v_pair_list[i]->value;
		}
	}
	return NULL;
}

void dump_buffer_content(const char *context, char *buf, int len, int mode) {
	fprintf(stderr, "\n< %s dump start  ", context);
		int i=0;
	for ( i=0; i<len; i++) {
		if (0==i%4)
			fprintf(stderr, "\n dump > ");
		if (1 == mode) { //ascii mode
			fprintf(stderr, "0x%02x ", buf[i]);
		} else { //char mode
			fprintf(stderr, "%c ", buf[i]);
		}
	}
	fprintf(stderr, "  dump end >\n\n");
}

int get_header_op()
{
	return header_op;
}

void ignite(ev_thread_t *et) {
	// ignite the first request
	int req_len = process_request(et->cb_arg->req_buf);
	int ret = bufferevent_write(et->bev, et->cb_arg->req_buf, (size_t) req_len);
	assert(ret == 0);
//	fprintf(stderr, "[%d]ignite first request\n", et->t_rank);
}

void process_response(struct event_base *eb, struct bufferevent *bev, char *buf, int len) {
	int parsed_bytes = 0;
	binary_header_t *h;
	uint16_t k_len;
	uint32_t body_len;
	uint8_t ext_len;
	int v_len;
	char *exp_get_value;
	int op_type;
	//long tmp_get_finished;
	while (parsed_bytes < len) {
		if(get_header_op() == 1)
			parsed_bytes += 24;
		h = (binary_header_t *)(buf+parsed_bytes);
		k_len = ntohs(h->key_len);
		body_len = ntohl(h->body_len);
		ext_len = h->extra_len;
		v_len = body_len - k_len - ext_len;
		op_type = h->opcode;
		if (CMD_GETK == op_type) {
			//printf("in CMD_TETK\n");
			assert(ext_len == 4 && v_len > 0);
			exp_get_value = get_value_by_key(buf+parsed_bytes + 24+ext_len, k_len);
			assert(exp_get_value != NULL);
			if (0 == strncmp(buf+parsed_bytes+ 24+ext_len+k_len, exp_get_value, (size_t) v_len)) {
				if (ENABLE_DBG) {
//					fprintf(stderr,  "[%d]success find k_v pair\n", gettid());
				}

				pthread_mutex_lock(&c_stats._lock);
				c_stats.num_get_resp_success++;
				pthread_mutex_unlock(&c_stats._lock);

				//if (!timer.is_running /*|| c_stats.num_get_resp_success >= 4*/) {
				if (!timer.is_running) {
					c_stats.cmd_mode = CMD_NOOP;
					timer.is_running = false;
					if (ENABLE_DBG) {
//						fprintf(stderr, "[%d]finish GETKs, exit program\n", gettid());
					}
					//display_stats();
					bufferevent_free(bev);
				}

			} else {
				if (ENABLE_DBG) {
					fprintf(stderr, "[%d]ERROR find k_v pair <dump ext=%d k_len=%d v_len=%d\n",
							gettid(), ext_len, k_len, v_len);
					dump_buffer_content("GETK_h", buf, 24, 1); //print header
					dump_buffer_content("GETK_b", buf+parsed_bytes, body_len-ext_len, 0); //print body w/o extra
				}
				pthread_mutex_lock(&c_stats._lock);
				c_stats.num_get_resp_fail++;
				pthread_mutex_unlock(&c_stats._lock);
			}
			parsed_bytes += 24+body_len;

		} else if (CMD_MGET == op_type) {

			//printf("in CMD_MGET\n");
			dump_buffer_content("MGET_h", buf, 24, 1);
			parsed_bytes += 24;
				int i=0;
			for (i=0; i<ext_len; i++) {
				h = (binary_header_t *)(buf + parsed_bytes);
				body_len = ntohl(h->body_len);
				dump_buffer_content("MGET_h_h", buf+parsed_bytes, 24, 1); //print header
				parsed_bytes += 24 + 4;
				dump_buffer_content("MGET_h_b", buf+parsed_bytes, body_len-4, 0); //print body w/o extra
				parsed_bytes += body_len-4;
			}
			pthread_mutex_lock(&c_stats._lock);
			c_stats.num_get_resp_success++;
			pthread_mutex_unlock(&c_stats._lock);

			//if (!timer.is_running /*|| c_stats.num_get_resp_success >= 1*/) {
			if (!timer.is_running) {
				c_stats.cmd_mode = CMD_NOOP;
				timer.is_running = false;
				if (ENABLE_DBG) {
//					fprintf(stderr, "[%d]finish MGET, exit program\n", gettid());
				}
				//display_stats();
				bufferevent_free(bev);
			}

		} else if (CMD_SET == op_type) {
	
			//printf("in CMD_SET\n");
			assert(ext_len == 0 && body_len == 0);
			if (0 == ntohs(h->status)) {

				pthread_mutex_lock(&c_stats._lock);
				c_stats.num_set_resp_success++;
				pthread_mutex_unlock(&c_stats._lock);
				if (ENABLE_DBG) {
//					fprintf(stderr, "[%d]op=%d success\n", gettid(), op_type);
				}
			} else {
				if (ENABLE_DBG) {
					fprintf(stderr, "[%d]op=%d fail code=%d\n", gettid(), op_type, ntohs(h->status));
				}

				pthread_mutex_lock(&c_stats._lock);
				c_stats.num_set_resp_fail++;
				pthread_mutex_unlock(&c_stats._lock);
			}
			parsed_bytes += 24;

			if (c_stats.num_set_resp_success == num_keys) {
				if (gen_fix_keys) {
					c_stats.cmd_mode = CMD_NOOP;
					fprintf(stderr, "[%d]finish SETs, exit program\n", gettid());
					display_stats(&c_stats);
					bufferevent_free(bev);
					printf("finish the free():\n");
				} else {
					c_stats.cmd_mode = CMD_GETK;
					timer.is_running = true;
					setup_timer(eb, rec_interval);
						int t=0;
					for (t = 0; t < num_threads; t++) {
						ignite(&ev_threads[t]);
					}
					fprintf(stderr, "[%d]finish SETs, start GETKs and setup timer\n", gettid());
				}
			}
			//fprintf(stderr, "parsed=%d len=%d\n", parsed_bytes, len);

		} else {
			//fprintf(stderr, "[%d]get unexpected response\n", gettid());
		}

	}
	//fprintf(stderr, "finish process_response\n");
}

int process_request(char *req_buf) {
	int key_idx;
	int req_body_len = 0;
    int header_op = get_header_op();
	if (c_stats.cmd_mode == CMD_SET) {
		pthread_mutex_lock(&c_stats._lock);
		key_idx = (int) c_stats.num_set_req;
		c_stats.num_set_req++;
		pthread_mutex_unlock(&c_stats._lock);
		compose_binary_set(req_buf, k_v_pair_list[key_idx]->key,
						   k_v_pair_list[key_idx]->value, &req_body_len);
		if(header_op==1)
			req_body_len += 40;
		else
			req_body_len += 32;

	} else if (c_stats.cmd_mode == CMD_GETK) {
		pthread_mutex_lock(&c_stats._lock);
		key_idx = rand() % num_keys;
		c_stats.num_get_req++;
		pthread_mutex_unlock(&c_stats._lock);

		req_body_len = compose_binary_get(req_buf, k_v_pair_list[key_idx]->key, CMD_GETK, 0, 0);
	} else if (c_stats.cmd_mode == CMD_MGET) {
		pthread_mutex_lock(&c_stats._lock);
		c_stats.num_get_req += batch_size;
		pthread_mutex_unlock(&c_stats._lock);
		char **key_list = (char **)malloc(sizeof(char *)*batch_size);
			int i=0;
		for ( i=0; i<batch_size; i++) {
			key_idx = rand() % num_keys;
			key_list[i] = k_v_pair_list[key_idx]->key;
		}
		req_body_len = (int) binary_mget_req(req_buf, key_list, batch_size);
		//compose_binary_mget_req_h(req_buf, batch_size, (size_t) (req_body_len - 24));
		dump_buffer_content("MGET_req", req_buf, req_body_len, 1);
		free(key_list);

	} else {
		//skip for now
	}
	if (ENABLE_DBG) {
//		fprintf(stderr, "[%d]send op=%d len=%d\n", gettid(), c_stats.cmd_mode, req_body_len);
	}
	return req_body_len;
}

void timer_cb(int sock, short which, void *arg) {
	//if (!evtimer_pending(timer.ev, NULL)) {
	//printf("\n\n\n timer.timer_count :%d\n\n\n",timer.timer_count);
       // printf("\n\n\n rec_round :%d\n\n\n",rec_round);

		if (timer.timer_count < rec_round) {
			timer.timer_count++;
			double rps;
			pthread_mutex_lock(&c_stats._lock);
			rps = c_stats.num_get_resp_success - c_stats.num_last_finished_get;
			c_stats.num_last_finished_get = c_stats.num_get_resp_success;
			pthread_mutex_unlock(&c_stats._lock);

				rps = 1.0 * rps/timer.to.tv_sec;
			fprintf(stdout, "#[%d]round=%d get_op_rps %.1f\n", gettid(), timer.timer_count, rps);
			fflush(stdout);
			//evtimer_add(timer.ev, &timer.to);
		} else {
			timer.is_running = false;
			event_del(timer.ev);
//			fprintf(stderr, "[%d]exit timer_cb\n", gettid());
			display_stats(&c_stats);
		}
	//}
}

void setup_timer(struct event_base *base, int sec) {
	timer.to.tv_sec = sec;
	timer.to.tv_usec = 0;
	timer.ev = event_new(base, -1, EV_PERSIST, timer_cb, NULL);
	evtimer_add(timer.ev, &timer.to);

	time(&p_end);
	int elapsed_time = (int) difftime(p_start, p_end);
	if (elapsed_time < warmup_time && !pure_get_mode) {
		sleep((unsigned int) (warmup_time - elapsed_time));
	}
	fprintf(stderr, "[%d]setup timer sleep %d sec\n", gettid(), warmup_time-elapsed_time);
}

void init_ev_thread(int sock_fd, int idx) {
	ev_threads[idx].sfd = sock_fd;
	ev_threads[idx].t_rank = idx;
	ev_threads[idx].base = event_base_new();
	assert(NULL != ev_threads[idx].base);
	ev_threads[idx].cb_arg = &cb_arg_list[idx];
	evutil_make_socket_nonblocking(ev_threads[idx].sfd);
	ev_threads[idx].bev = bufferevent_socket_new(ev_threads[idx].base, ev_threads[idx].sfd, BEV_OPT_CLOSE_ON_FREE);
	bufferevent_setcb(ev_threads[idx].bev, client_recv_cb, NULL, socket_ev_cb, (void *)&ev_threads[idx]);
	bufferevent_enable(ev_threads[idx].bev, EV_READ|EV_PERSIST);
}

void *worker_thread(void *arg) {
	ev_thread_t *ev_t = (ev_thread_t *)arg;
	ev_t->tid = pthread_self();

//	fprintf(stderr, "tid[%d] r=%d dispatching\n", gettid(), ev_t->t_rank);
	if (!gen_fix_keys || pure_get_mode || 0 == ev_t->t_rank) {
		event_base_dispatch(ev_t->base);
	}
	event_base_free(ev_t->base);
//	fprintf(stderr, "tid[%d] r=%d free\n", gettid(), ev_t->t_rank);
	return (void *)5;
}

int main(int argc, char **argv) {
	//pthread_t thr;
	char *host;
	int port;
	int i;
	char random_state[16];
	int c;
	pthread_attr_t attr;
	int rc;
	void *status;

	//ev_thread_t main_ev;

	time(&p_start);
	init_default_env();

	while ((c = getopt(argc, argv, "c:k:r:u:x:a:f:v:l:i:n:t:w:m:g:b:h:")) != EOF) {
		switch (c) {
			case 'a':
				use_ascii_protocol = atoi(optarg) ? true : false;
				break;
			case 'c':
				num_connections = intarg(argv[0], optarg);
				break;
			case 'k':
				num_keys = intarg(argv[0], optarg);
				break;
			case 'v':
				v_length = intarg(argv[0], optarg);
				break;
			case 'l':
				k_length = intarg(argv[0], optarg);
				break;
			case 'r':
				num_requests = intarg(argv[0], optarg);
				break;
			case 's':
				seed = intarg(argv[0], optarg);
				break;
			case 'i':
				//poll_wait_sec = intarg(argv[0], optarg);
				rec_interval = intarg(argv[0], optarg);
				break;
			case 'n':
				rec_round = intarg(argv[0], optarg);
				break;
			case 'u':
				enable_udp = atoi(optarg) ? true : false;
				break;
			case 'x':
				key_prefix = strdup(optarg);
				break;
			case 'f':
				config_file = strdup(optarg);
				break;
			case 't':
				num_threads = intarg(argv[0], optarg);
				break;
			case 'w':
				warmup_time = intarg(argv[0], optarg);
				break;
			case 'm':
				gen_fix_keys = atoi(optarg) ? true : false;
				break;
			case 'g':
				pure_get_mode = atoi(optarg) ? true : false;
				break;
			case 'b':
				batch_size = atoi(optarg);
				break;
			case 'h':
				header_op =  atoi(optarg) ? true : false;
				break;
			default:
				usage(argv[0]);
				break;
		}
	}
//printf("key_length,v_length  ---------------------------%d,%d\n",k_length,v_length);
	initstate(seed, random_state, sizeof(random_state));
	if (NULL != config_file) {
		parse_ini_file(config_file);
	} else {
		exit(EXIT_FAILURE);
	}

	/* at least has host, port parameters */
	if (argc < optind + 2)
		usage(argv[0]);

	host = argv[optind++];
	port = intarg(argv[0], argv[optind]);
	
	

	/*
	if (enable_udp) {
		//create_udp_connection(host, port);
		exit(EXIT_FAILURE);
	} else {
		tcp_fd = clientsock(host, port);
		//set_nodelay(tcp_fd, 0);
	}
	int sfd = (enable_udp)? u_sock_fd : tcp_fd;
	 */

	sfds = (int *)malloc(sizeof(int)*num_threads);
	ev_threads = (ev_thread_t *)malloc(sizeof(ev_thread_t) * num_threads);
	cb_arg_list = (cli_r_cb_arg *)malloc(sizeof(cli_r_cb_arg) * num_threads);
	//cli_r_cb_arg r_cb_arg;
	//r_cb_arg.req_buf = (char *)malloc(sizeof(char) * MAX_SET_BUF_SIZE);
	//r_cb_arg.resp_buf = (char *)malloc(sizeof(char) * MAX_SET_BUF_SIZE);

	if (NULL == sfds || NULL == ev_threads || NULL == cb_arg_list)
		exit(EXIT_FAILURE);

	init_k_v_pair_list(num_keys, key_prefix); //shared by all threads

	pthread_mutex_init(&c_stats._lock, NULL);
	int t=0;
	for ( t=0; t<num_threads; t++) {
		sfds[t] = clientsock(host, port);
		cb_arg_list[t].req_buf = (char *)malloc(sizeof(char) * MAX_SET_BUF_SIZE);
		cb_arg_list[t].resp_buf = (char *)malloc(sizeof(char) * MAX_SET_BUF_SIZE);
		init_ev_thread(sfds[t], t);
		//fprintf(stderr, "t[%d]=%d init\n", t, gettid());
	}
//	fprintf(stderr, "finish init thread info\n");

	if (pure_get_mode) {
		if (batch_size > 0) {
			c_stats.cmd_mode = CMD_MGET;
		} else {
			c_stats.cmd_mode = CMD_GETK; //init set first
		}
		timer.is_running = true;
		setup_timer(ev_threads[0].base, rec_interval);
		
		for ( t = 0; t < num_threads; t++) {
			ignite(&ev_threads[t]);
		}
//		fprintf(stderr, "pure_get_mode ignite first gets\n");
	} else {
		c_stats.cmd_mode = CMD_SET; //init set first
		ignite(&ev_threads[0]);
//		fprintf(stderr, "ignite first SET on first thread\n");
	}

	pthread_attr_init(&attr);
	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
	for (t=0; t<num_threads; t++) {
		rc = pthread_create(&ev_threads[t].tid, NULL, worker_thread, (void *)&ev_threads[t]);
		assert(rc == 0);
	}
//	fprintf(stderr, "finish create threads\n");

	/*
	for (int i=0; i<num_keys; i++) {
		if (use_ascii_protocol) {
			ascii_set_request(sfd, k_v_pair_list[i]->key_list, k_v_pair_list[i]->value_list, v_length);
			ascii_parse_set_response(sfd);
		} else {
			binary_set_or_noop(k_v_pair_list[i]->key_list, k_v_pair_list[i]->value_list, false);
		}
		fprintf(stderr, "finish set <%s> <%s>\n", k_v_pair_list[i]->key_list,
				k_v_pair_list[i]->value_list);
	}

	char temp_buf[MAX_GET_BUF_SIZE];
	for (int i=0; i<2; i++) {
		memset(temp_buf, 0, MAX_GET_BUF_SIZE);
		size_t nbytes = binary_mget_req(temp_buf, key_list_ptr+ i*num_keys/2, num_keys/2);
		send_request(tcp_fd, temp_buf, nbytes);
		memset(temp_buf, 0, MAX_GET_BUF_SIZE);
		ssize_t rbytes = recv_request(tcp_fd, temp_buf, MAX_GET_BUF_SIZE);
		fprintf(stderr, "\n\n...finish %d mget...\n\n", i+1);
	}

	binary_set_or_noop(key_list[num_keys-1], value_list[num_keys-1], false);
	free(key_list_ptr);
	fprintf(stderr, "finish test\n");
	 */

	//main_base = event_base_new(); //main thread event_base
	//fprintf(stderr, "try create new main_base\n");
	//assert(NULL != main_base);

	srand((unsigned int) time(NULL));
	/*
	struct bufferevent *bev;
	evutil_make_socket_nonblocking(sfd);
	bev = bufferevent_socket_new(base, sfd, BEV_OPT_CLOSE_ON_FREE);
	bufferevent_setcb(bev, client_recv_cb, NULL, socket_ev_cb, (void *)&r_cb_arg);
	bufferevent_enable(bev, EV_READ|EV_PERSIST);
	 */

	//event_base_dispatch(main_base);
	//fprintf(stderr, "finish main thread dispatch\n");

	for ( t=0; t<num_threads; t++) {
		rc = pthread_join(ev_threads[t].tid, &status);
		assert(rc == 0);
//		fprintf(stderr, "success join thread[%d]\n", ev_threads[t].t_rank);
	}

	//event_base_free(main_base);

	if (NULL != cb_arg_list) {
		for ( t=0; t<num_threads; t++) {
			if (NULL != cb_arg_list[t].req_buf)
				free(cb_arg_list[t].req_buf);
			if (NULL != cb_arg_list[t].resp_buf)
				free(cb_arg_list[t].resp_buf);
		}
		free(cb_arg_list);
	}

	release_k_v_pair_list(num_keys);

	pthread_mutex_destroy(&c_stats._lock);

	if (sfds != NULL)
		free(sfds);

	if (ev_threads != NULL)
		free(ev_threads);

	//if (bev != NULL)
	//	free(bev);
//	fprintf(stderr, "finish free base\n");

	return 0;
}

void socket_ev_cb(struct bufferevent *bev, short events, void *arg) {
	evutil_socket_t fd = bufferevent_getfd(bev);
//	fprintf(stderr, "socket_event_cb fd=%u\n", fd);
	if (events & BEV_EVENT_EOF) {
//		fprintf(stderr, "connection closed\n");
	} else if (events & BEV_EVENT_ERROR) {
		fprintf(stderr, "some other error\n");
	} else if (events & BEV_EVENT_TIMEOUT) {
		fprintf(stderr, "time out error\n");
	}
	bufferevent_free(bev);
}

void client_recv_cb(struct bufferevent *bev, void *arg) {
	//cli_r_cb_arg *cb_arg = (cli_r_cb_arg *)arg;
	ev_thread_t *e = (ev_thread_t *)arg;
	ssize_t len = bufferevent_read(bev, e->cb_arg->resp_buf, 1024);
	//printf("\n\nget len:%zu\n\n",len);
	assert(len >=24);
	e->cb_arg->resp_buf[len] = '\0';
	process_response(e->base, bev, e->cb_arg->resp_buf, (int) len);
	//while (CMD_GETK == stats.cmd_mode && stats.num_get_req < stats.num_get_resp_success+5) {
	int req_len = process_request(e->cb_arg->req_buf);
	int ret = bufferevent_write(bev, e->cb_arg->req_buf, (size_t) req_len);
	assert(ret == 0);
	//}

	//sleep(1);
}
